package com.lelebd;

import com.lelebd.bean.LoginEvent;
import com.lelebd.bean.LoginFailWarning;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.net.URL;
import java.util.Iterator;

/**
 * 检测登录事件中，短时间内连续登录失败N次的用户并且生成预警信息
 */
public class LoginFailEnhance {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //1、从正文中读取数据
        URL resource = LoginFailEnhance.class.getResource("/LoginLog.csv");
        DataStreamSource<String> loginEventData = env.readTextFile(resource.getPath());
        SingleOutputStreamOperator<LoginEvent> loginEventStream = loginEventData.map(new MapFunction<String, LoginEvent>() {
            @Override
            public LoginEvent map(String value) throws Exception {
                String[] vals = value.split(",");
                //Long userId, String ip, String loginState, Long timestamp
                return new LoginEvent(Long.valueOf(vals[0]), vals[1], vals[2], Long.valueOf(vals[3]));
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(3)) {
            @Override
            public long extractTimestamp(LoginEvent element) {
                return element.getTimestamp() * 1000L;
            }
        });

        //2、自定义出来函数检测连续登录失败事件
        loginEventStream
                .keyBy(LoginEvent::getUserId)
                .process(new LoginFailDetectWarning(2))
                .print("连续登录失败预警信息");
        env.execute("登录风控预警");
    }

    public static class LoginFailDetectWarning extends KeyedProcessFunction<Long, LoginEvent, LoginFailWarning> {
        //定义连续失败的次数
        private Integer maxFailTimes;

        public LoginFailDetectWarning(int maxFailTimes) {
            this.maxFailTimes = maxFailTimes;
        }

        //定义状态：保存当前N秒内所有的登录失败记录
        ListState<LoginEvent> loginFailEventListState = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            loginFailEventListState = getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("login-fail", LoginEvent.class));
        }

        @Override
        public void processElement(LoginEvent event, Context ctx, Collector<LoginFailWarning> out) throws Exception {
            //以登录事件作为判断报警的触发条件，不在注册定时器。
            if ("fail".equals(event.getLoginState())) {
                //如果是登录失败。判断之前是否有登录失败
                Iterator<LoginEvent> iterator = loginFailEventListState.get().iterator();
                if (iterator.hasNext()) {
                    //如果已经有登录失败事件，则判断时间戳是否在2S之内
                    LoginEvent firstFailEvent = iterator.next();
                    if (event.getTimestamp() - firstFailEvent.getTimestamp() <= 2) {
                        //Long userId, Long firstFailTime, Long lastFailTime, String warningMsg
                        out.collect(new LoginFailWarning(
                                ctx.getCurrentKey(),
                                Long.valueOf(firstFailEvent.getTimestamp()),
                                Long.valueOf(event.getTimestamp()),
                                "2S之内连续登录失败预警"
                        ));
                    }

                    //不管报警与否，这次都已经处理，直接更新状态
                    loginFailEventListState.clear();
                    loginFailEventListState.add(event);
                } else {
                    //如果没有登录失败，则将当前的登录失败事件存入列表状态中。
                    loginFailEventListState.add(event);
                }
            } else {
                //如果是登录成功，直接清除状态。等之后的登录失败记录。
                loginFailEventListState.clear();
            }
        }
    }
}
